package com.hzya.frame;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Tags({"oracle", "database", "dba", "tablespace", "auto", "datafile"})
@CapabilityDescription("监控Oracle表空间的使用情况，并在使用率超过指定阈值时自动添加新的数据文件。" + "同时，它会检查磁盘空间和数据文件的AUTOEXTENSIBLE属性，并收集潜在问题的告警信息。")
@SeeAlso({})
@ReadsAttributes({})
@WritesAttributes({@WritesAttribute(attribute = "autoadd.oracle.datafile.alerts", description = "包含在检查过程中生成的告警信息。多条信息将以换行符分隔。")})
@InputRequirement(Requirement.INPUT_ALLOWED)
public class DevAutoAddOracleDatafileProcessor extends AbstractProcessor {

    // 处理器属性定义
    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("dbcp-service").displayName("数据库连接池服务").description("提供Oracle数据库连接的控制器服务。").required(true).identifiesControllerService(DBCPService.class).build();

    public static final PropertyDescriptor TABLESPACE_NAMES = new PropertyDescriptor.Builder().name("tablespace-names").displayName("表空间名称").description("需要监控的Oracle表空间名称列表，使用逗号分隔 (例如：USERS,DATA01)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public static final PropertyDescriptor USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("usage-threshold").displayName("使用率阈值 (%)").description("触发添加新数据文件的表空间使用率百分比 (1-99)。例如，填入 80。").required(true).defaultValue("80").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();

    public static final PropertyDescriptor DATAFILE_PATH = new PropertyDescriptor.Builder().name("datafile-path").displayName("新数据文件路径").description("用于创建新数据文件的目录的绝对路径。请确保运行Oracle的用户对此目录有写入权限。" + "同时支持Linux (例如 /u01/app/oracle/oradata/ORCL/) 和 Windows (例如 C:\\oracle\\oradata\\ORCL\\) 路径。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    //    public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).defaultValue("100M").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("100M").build();

    public static final PropertyDescriptor DISK_USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("disk-usage-threshold").displayName("磁盘使用率阈值 (%)").description("如果数据文件路径所在磁盘的使用率超过此百分比(1-99)，将生成告警并且不会添加数据文件。").required(true).defaultValue("90").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();

    // 关系定义
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("检查成功完成后（无论是否添加了数据文件），FlowFile将路由到此关系。").build();

    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("当发生不可恢复的错误时（如数据库连接失败），FlowFile将路由到此关系。").build();

    public static final Relationship REL_ALERT = new Relationship.Builder().name("alert").description("当检测到潜在问题时（如磁盘使用率过高、AUTOEXTEND关闭等），包含告警信息的新FlowFile将被路由到此关系。").build();

    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<>();
        descriptors.add(DBCP_SERVICE);
        descriptors.add(TABLESPACE_NAMES);
        descriptors.add(USAGE_THRESHOLD);
        descriptors.add(DATAFILE_PATH);
        descriptors.add(NEW_DATAFILE_SIZE);
        descriptors.add(DISK_USAGE_THRESHOLD);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_ALERT);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        // 如果没有输入FlowFile，则创建一个新的来触发执行逻辑
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            flowFile = session.create();
        }

        final long usageThreshold = context.getProperty(USAGE_THRESHOLD).asLong();
        final long diskUsageThreshold = context.getProperty(DISK_USAGE_THRESHOLD).asLong();
        final String datafileBasePath = context.getProperty(DATAFILE_PATH).getValue();
        final String newDatafileSize = context.getProperty(NEW_DATAFILE_SIZE).getValue();
        final List<String> alertMessages = new ArrayList<>();

        // 从属性中获取逗号分隔的表空间名称字符串，并转换为列表
        final String rawTablespaceNames = context.getProperty(TABLESPACE_NAMES).getValue();
        final List<String> tablespaceNames = Arrays.stream(rawTablespaceNames.split(",")).map(String::trim) // 去除每个表空间名称前后的空格
                .filter(name -> !name.isEmpty()) // 过滤掉可能因 "a,,b" 这种格式产生的空字符串
                .collect(Collectors.toList());

        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);

        // 步骤 1: 检查数据文件目录的磁盘空间
        // 此检查方法兼容 Windows 和 Linux 操作系统
        try {
            Path path = Paths.get(datafileBasePath);
            if (!Files.exists(path)) {
                throw new IOException("数据文件路径不存在: " + datafileBasePath);
            }
            FileStore store = Files.getFileStore(path);
            long totalSpace = store.getTotalSpace();
            long usableSpace = store.getUsableSpace();
            long usedSpace = totalSpace - usableSpace;
            // 处理 totalSpace 为 0 的情况，避免除零异常
            double currentDiskUsage = totalSpace > 0 ? (double) usedSpace / totalSpace * 100 : 0;

            String diskUsageMsg = String.format("磁盘路径 '%s' 空间耗用: %.2f%% (已用: %d GB, 总计: %d GB)", datafileBasePath, currentDiskUsage, usedSpace / 1024 / 1024 / 1024, totalSpace / 1024 / 1024 / 1024);
            getLogger().info(diskUsageMsg); // 关键信息：记录磁盘耗用日志

            if (currentDiskUsage >= diskUsageThreshold) {
                String alert = String.format("严重告警: 磁盘路径 '%s' 的空间使用率 %.2f%%, 已超过设定的阈值 %d%%。", datafileBasePath, currentDiskUsage, diskUsageThreshold);
                alertMessages.add(alert);
                getLogger().error(alert); // 关键信息：记录到日志
            }
        } catch (IOException e) {
            getLogger().error("检查磁盘空间失败，路径: '{}'。请检查路径是否正确以及NiFi运行用户是否有权访问。", new Object[]{datafileBasePath}, e);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }

        // 步骤 2: 连接数据库并对每个表空间进行检查
        try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
            // 循环处理每一个配置的表空间
            for (String tablespaceName : tablespaceNames) {
                tablespaceName = tablespaceName.trim().toUpperCase();
                List<String> currentTbsAlerts = new ArrayList<>(); // 用于收集当前循环中表空间的告警信息

                // 步骤 2a: 验证表空间数据文件是否可以自动扩容 (AUTOEXTENSIBLE)
                String autoextendSql = "SELECT file_name, autoextensible FROM dba_data_files WHERE tablespace_name = ?";
                try (PreparedStatement ps = conn.prepareStatement(autoextendSql)) {
                    ps.setString(1, tablespaceName);
                    ResultSet rs = ps.executeQuery();
                    boolean hasNonAutoensibleFile = false;
                    while (rs.next()) {
                        if ("NO".equalsIgnoreCase(rs.getString("autoextensible"))) {
                            hasNonAutoensibleFile = true;
                            getLogger().warn("表空间 '{}' 的数据文件 '{}' 被设置为 AUTOEXTENSIBLE=NO。", new Object[]{tablespaceName, rs.getString("file_name")});
                        }
                    }
                    if (hasNonAutoensibleFile) {
                        currentTbsAlerts.add(String.format("告警: 表空间 '%s' 中存在一个或多个数据文件的 AUTOEXTENSIBLE 属性为 'NO'。", tablespaceName));
                    }
                }

                // 步骤 2b: 使用您提供的SQL1逻辑来查询表空间使用率 (兼容Oracle 11g, 12c, 19c)
                String usageSql = "SELECT used_gb, current_gb, used_pct_max FROM (" + "SELECT df.tablespace_name, " + "ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / 1024 / 1024 / 1024, 2 ) AS used_gb, " + "ROUND( SUM( df.bytes ) / 1024 / 1024 / 1024, 2 ) AS current_gb, " + "CASE WHEN SUM(df.maxbytes) = 0 THEN 100 ELSE ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / SUM( df.maxbytes ) * 100, 2 ) END AS used_pct_max " + "FROM dba_data_files df LEFT JOIN ( SELECT tablespace_name, file_id, SUM( bytes ) AS free_bytes " + "FROM dba_free_space GROUP BY tablespace_name, file_id ) fs ON df.file_id = fs.file_id " + "GROUP BY df.tablespace_name) " + "WHERE tablespace_name = ?";

                try (PreparedStatement ps = conn.prepareStatement(usageSql)) {
                    ps.setString(1, tablespaceName);
                    ResultSet rs = ps.executeQuery();
                    if (rs.next()) {
                        // 读取大数字时使用 getBigDecimal 保证精度，然后转换为 double
                        double usedPct = rs.getBigDecimal("used_pct_max").doubleValue();
                        double usedGb = rs.getBigDecimal("used_gb").doubleValue();
                        double currentGb = rs.getBigDecimal("current_gb").doubleValue();

                        String usageLogMsg = String.format("表空间 '%s' 空间耗用: %.2f%% (已用: %.2f GB, 当前总大小: %.2f GB)", tablespaceName, usedPct, usedGb, currentGb);
                        getLogger().info(usageLogMsg); // 关键信息：记录表空间耗用日志

                        if (usedPct > usageThreshold) {
                            // 表空间使用率已超过阈值
                            String thresholdAlert = String.format("告警: 表空间 '%s' 使用率 %.2f%%, 已超过设定的阈值 %d%%。", tablespaceName, usedPct, usageThreshold);
                            currentTbsAlerts.add(thresholdAlert);
                            getLogger().warn(thresholdAlert);

                            // 再次确认磁盘空间是否超限，如果已超限，则不执行添加操作
                            if (alertMessages.stream().anyMatch(m -> m.startsWith("严重告警: 磁盘路径"))) {
                                getLogger().error("由于磁盘空间使用率已超阈值，跳过为表空间 '{}' 添加数据文件的操作。", new Object[]{tablespaceName});
                            } else {
                                // 执行添加数据文件的逻辑
                                String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
                                String newFileName = String.format("%s_%s.dbf", tablespaceName, timestamp); // 数据文件命名规则
                                String fullPath = new File(datafileBasePath, newFileName).getPath();
                                // 为Windows系统处理路径分隔符
                                if (System.getProperty("os.name").toLowerCase().contains("win")) {
                                    fullPath = fullPath.replace('\\', '/');
                                }

                                String addDatafileSql = String.format("ALTER TABLESPACE %s ADD DATAFILE '%s' SIZE %s AUTOEXTEND ON NEXT 50M", tablespaceName, fullPath, newDatafileSize);

                                getLogger().info("准备执行SQL语句以添加数据文件: {}", new Object[]{addDatafileSql}); // 关键信息：记录创建数据文件的SQL

                                try (Statement stmt = conn.createStatement()) {
                                    stmt.execute(addDatafileSql);
                                    getLogger().info("成功为表空间 '{}' 添加数据文件 '{}'。", new Object[]{tablespaceName, fullPath});
                                } catch (Exception e) {
                                    String addFileError = String.format("严重告警: 为表空间 '%s' 添加数据文件失败。SQL: %s", tablespaceName, addDatafileSql);
                                    getLogger().error(addFileError, e);
                                    currentTbsAlerts.add(addFileError + " - 错误详情: " + e.getMessage());
                                }
                            }
                        }
                    } else {
                        String notFoundAlert = String.format("警告: 未找到表空间 '%s'，或相关的空间使用率查询没有返回数据。", tablespaceName);
                        getLogger().warn(notFoundAlert);
                        currentTbsAlerts.add(notFoundAlert);
                    }
                }
                // 将当前表空间的所有告警信息统一添加到主告警列表中
                alertMessages.addAll(currentTbsAlerts);
            }

        } catch (Exception e) {
            getLogger().error("处理Oracle表空间检查时发生错误: {}", e.getMessage(), e);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }

        // 步骤 3: 如果收集到了任何告警信息，则创建一个新的FlowFile并路由到 'alert' 关系
        if (!alertMessages.isEmpty()) {
            FlowFile alertFlowFile = session.create(flowFile);
            // 将所有告警信息用换行符连接成一个字符串
            String alertContent = String.join("\n", alertMessages);
            alertFlowFile = session.write(alertFlowFile, out -> out.write(alertContent.getBytes(StandardCharsets.UTF_8)));
            // 同时将告警信息写入FlowFile的属性中，方便下游处理器直接使用
            alertFlowFile = session.putAttribute(alertFlowFile, "autoadd.oracle.datafile.alerts", alertContent);
            session.transfer(alertFlowFile, REL_ALERT);
        }

        // 步骤 4: 将原始的(或新创建的)FlowFile路由到 'success' 关系
        session.transfer(flowFile, REL_SUCCESS);
    }
}